/*
 * @Author: lokei
 * @Date: 2022-08-28 22:46:40
 * @LastEditors: lokei
 * @LastEditTime: 2022-09-03 09:45:00
 * @Description: 
 */
package cn.lokei.gateway.config.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;

@Configuration
@PropertySource(value = { "classpath:kafka/kafka.properties" })
public class KafkaProducerConfig {
	// bootstrap
	@Value("${kafka.bootstrap-servers}")
	private String bootstrap_servers_config;
	@Value("${kafka.producer.retries}")
	private String retries_config;
	@Value("${kafka.producer.batch-size}")
	private String batch_size_config;
	@Value("${kafka.producer.linger-ms}")
	private String linger_ms_config;
	@Value("${kafka.producer.buffer-memory}")
	private String buffer_memory_config;

	@Bean
	public KafkaTemplate<String, Object> kafkaTemplate() {
		HashMap<String, Object> configs = new HashMap<>();
		configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers_config);
		configs.put(ProducerConfig.RETRIES_CONFIG, retries_config);
		configs.put(ProducerConfig.BATCH_SIZE_CONFIG, batch_size_config);
		configs.put(ProducerConfig.LINGER_MS_CONFIG, linger_ms_config);
		configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, buffer_memory_config);
		configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
		// 设置序列化
		configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		// 设置自定义分区
		DefaultKafkaProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<String, Object>(configs);
		return new KafkaTemplate<String, Object>(producerFactory);
	}
}